Workflow Advanced¶
In Workflow we saw how to easly create simple stage and links stages to build pipeline. It’s time to see complet workflow features to create more complex Stage.
Data format¶
First takes a look at how data are represented inside stage, a, b, c are column names provide by user and used by stages:
scn 0 | scn 1 | scn … | |||||||
t | a | b | … | a | b | … | a | b | … |
0 | _ | _ | _ | _ | _ | _ | _ | _ | _ |
Pipeline could be more flexible, and allow user input without scenarios. Like that, it will be standardized by adding a default 0th scenario.
t | a | b | … |
0 | _ | _ | _ |
Constraint on input and output¶
As you see above, data contains scenarios and each scenario contains
columns with generic names. These names become a constraint. For example
some stages expectes to receive strict name, or will produce other
columns with new name. Hadar provide a mechanism to handle this
complexity called Plug. You has already seen hd.FreePlug
which
mean stage has no constraint: It doesn’t expected any particular input
and doesn’t produce specific column.
For example, if you juste need to multiply by twice data, you can create
a Stage with FreePlug
:
import hadar as hd
import numpy as np
import pandas as pd
class Twice(hd.Stage):
def __init__(self):
Stage.__init__(plug=hd.FreePlug())
def _process_timeline(tl):
return tl * 2
It simple, but some time, you expected strictly column name to process
timeline. In this case you will use
hd.RestrictedPlug(input, output)
, input declare what column names
you expected to perform calcul, output says what is new column names
created during calcul.
Now we care about column name, we often need to apply calcul scenario by
scenario and not at the global dataframe. To handle, this mechanism,
hadar provides you a FocusStage
which give you a
_process_scenario(scn, tl)
to implement.
In last example, we created a Stage to generate wind power, just by apply a linear random generation. Now we want more precise generation. Whereas previous stage just use max variables to generate linear random, we use two variables mean and std to generate normal random.
class Wind(hd.FocusStage): # Compute will be done scenario by scenario so we use FocusStage
def __init__(self):
# Use Restricted plug to force constraint
hd.FocusStage.__init__(self, plug=hd.RestrictedPlug(inputs=['mean', 'std'], outputs=['wind']))
def _process_scenarios(self, nb_scn, tl):
return tl['mean'] + np.random.randn(tl.shape[0]) * tl['std']
Wind can be plug, upstream stages have to provide mean and std,
downstream stage should use wind. For example, hd.Clip
and
hd.RepeadScenario
are a free plug, you can plug them every where
pipe = hd.RepeatScenario(5) + Wind() + hd.Clip(lower=0) # Make sur no negative production are generated
But if you want to plug Fault
, error will raise, because Fault
expectes a quantity column
try:
pipe = hd.RepeatScenario(5) + Wind() + hd.Clip(lower=0) \
+ hd.Fault(occur_freq=0.01, loss=100, downtime_min=1, downtime_max=10)
except ValueError as e:
print('ValueError:', e)
ValueError: Pipeline can't be added current outputs are ['wind'] and Fault has input ['quantity']
In this case, you can use hd.Rename
to refix stages with good column
name. To summerize pipeline : 1. copy 5 time data in new scenarios 2.
apply random generation for each scenarios 3. cap data below 0 (a
negativ productoin doesn’t exist) 4. Rename data column from wind to
quantity 5. Generate random fault for each scenarios
pipe = hd.RepeatScenario(5) + Wind() + hd.Clip(lower=0) \
+ hd.Rename(wind='quantity') + hd.Fault(occur_freq=0.01, loss=100, downtime_min=1, downtime_max=10)
Check is performed when stages are linked together, but also when user give input data. Lines above will raise error since input doesn’t have mean columns name
t = np.linspace(0, 4*3.14, 168)
try:
i = pd.DataFrame({'NOT-mean': np.sin(t) * 1000 + 1000, 'std': np.sin(t*2)* 200 + 200})
o = pipe(i)
except ValueError as e:
print('ValueError:', e)
ValueError: Pipeline accept ['mean', 'std'] in input, but receive ['NOT-mean' 'std']
i = pd.DataFrame({'mean': np.sin(t) * 1000 + 1000, 'std': np.sin(t*2) * 200 + 200})
o = pipe(i.copy())
import plotly.graph_objects as go
fig = go.Figure()
fig.add_traces(go.Scatter(x=t, y=i['mean'], name='mean'))
fig.add_traces(go.Scatter(x=t, y=i['std']+i['mean'], name='std+', line=dict(color='red', dash='dash')))
fig.add_traces(go.Scatter(x=t, y=-i['std']+i['mean'], name='std-', line=dict(color='red', dash='dash')))
for n in range(5):
fig.add_traces(go.Scatter(x=t, y=o[n]['quantity'], name='wind %d' % n, line=dict(color='rgba(100, 100, 100, 0.5)')))
fig